package cc.shacocloud.mirage.utils;

import io.vertx.core.Future;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

/**
 * 基于{@link Future} 封装的工具类
 */
public class FutureUtils {
    
    
    /**
     * 顺序执行，默认集合开始元素0
     *
     * @see #sequential(List, BiFunction, boolean)
     */
    public static <T, R> Future<R> sequential(T[] arr,
                                              BiFunction<T, AtomicBoolean, Future<R>> fun) {
        return sequential(Arrays.asList(arr), fun, true);
    }
    
    /**
     * 顺序执行，默认集合开始元素0
     *
     * @see #sequential(List, BiFunction, boolean)
     */
    public static <T, R> Future<R> sequential(List<T> list,
                                              BiFunction<T, AtomicBoolean, Future<R>> fun) {
        return sequential(list, fun, true);
    }
    
    
    /**
     * 顺序执行，默认集合开始元素0
     *
     * @see #sequential(List, BiFunction, AtomicInteger, boolean)
     */
    public static <T, R> Future<R> sequential(List<T> list,
                                              BiFunction<T, AtomicBoolean, Future<R>> fun,
                                              boolean direction) {
        return sequential(list, fun, new AtomicInteger(0), direction);
    }
    
    
    /**
     * 顺序执行
     *
     * @param list       等待执行的集合
     * @param fun        集合中元素需要执行的函数，
     *                   函数的第二个入参为 {@link AtomicBoolean} 对象，
     *                   在当前函数执行时如果设置为 {@code true} 则当前函数为最后一个执行函数，后续的不在执行。返回的结果也为该函数的结果
     * @param startIndex 集合的开始索引，索引值最小为 0
     * @param direction  {@code startIndex} 是递增还是递减，如果是{@code true} 则为递增，反之则为递减
     * @param <T>        集合元素泛型
     * @return 最后一个元素执行函数返回的结果
     */
    public static <T, R> Future<R> sequential(List<T> list,
                                              BiFunction<T, AtomicBoolean, Future<R>> fun,
                                              AtomicInteger startIndex,
                                              final boolean direction) {
        if (list == null || list.isEmpty())
            return Future.failedFuture(new IllegalArgumentException("list 不允许为空！"));
        
        if (startIndex.get() >= list.size())
            return Future.failedFuture(new IllegalArgumentException("startIndex 超出 list 的长度！"));
        
        T t = list.get(startIndex.get());
        final AtomicBoolean termination = new AtomicBoolean(false);
        return fun.apply(t, termination).compose(r -> {
            
            // 如果指定为终止，则停止执行
            if (termination.get()) {
                return Future.succeededFuture(r);
            }
            
            if (direction ? startIndex.incrementAndGet() >= list.size() : startIndex.decrementAndGet() < 0) {
                return Future.succeededFuture(r);
            }
            
            return sequential(list, fun, startIndex, direction);
        });
    }
    
    /**
     * 顺序执行
     *
     * @param iterator 迭代器
     * @param fun      集合中元素需要执行的函数，
     *                 函数的第二个入参为 {@link AtomicBoolean} 对象，
     *                 在当前函数执行时如果设置为 {@code true} 则当前函数为最后一个执行函数，后续的不在执行。返回的结果也为该函数的结果
     * @param <T>      集合元素泛型
     * @return 最后一个元素执行函数返回的结果
     */
    public static <T, R> Future<R> sequential(Iterator<T> iterator,
                                              BiFunction<T, AtomicBoolean, Future<R>> fun) {
        if (iterator == null || !iterator.hasNext())
            return Future.failedFuture(new IllegalArgumentException("iterator 不允许为空！"));
        
        T t = iterator.next();
        
        final AtomicBoolean termination = new AtomicBoolean(false);
        return fun.apply(t, termination).compose(r -> {
            
            // 如果指定为终止，则停止执行
            if (termination.get()) {
                return Future.succeededFuture(r);
            }
            
            if (!iterator.hasNext()) {
                return Future.succeededFuture(r);
            }
            
            return sequential(iterator, fun);
        });
    }
    
    /**
     * 等待 {@link Future} 完成，该方法将异步逻辑变为同步。
     * <p>
     * 需要注意的是：该方法不能在 Vertx 的线程中执行，通常是在 main 线程中初始化时使用
     */
    @SuppressWarnings("unchecked")
    @Nullable
    public static <T> T await(@NotNull Future<T> future) {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Throwable> throwableReference = new AtomicReference<>();
        AtomicReference<Object> resultReference = new AtomicReference<>();
        
        future.onComplete(ar -> {
            if (ar.failed()) {
                throwableReference.set(ar.cause());
            } else {
                T result = ar.result();
                resultReference.set(Objects.isNull(result) ? Empty.INSTANCE : result);
            }
            
            latch.countDown();
        });
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("在等待 Future 完成时发生线程中断异常！", e);
        }
        
        Object result = resultReference.get();
        if (Objects.nonNull(result)) return Empty.INSTANCE.equals(result) ? null : (T) result;
        
        throw new RuntimeException(throwableReference.get());
    }
    
}
